Added flush buffer.
authorJeroen van der Heijden <jeroen@transceptor.technology>
Fri, 21 Sep 2018 16:16:46 +0000 (18:16 +0200)
committerJeroen van der Heijden <jeroen@transceptor.technology>
Fri, 21 Sep 2018 16:16:46 +0000 (18:16 +0200)
include/siri/db/buffer.h
src/siri/db/buffer.c
src/siri/db/insert.c
src/siri/heartbeat.c

index 0c122c4fbd16a1a35942a706f3d6cc4760dbd048..50c30baa08d5c763e5a05d0b72ba4dd0f6b69a65 100644 (file)
@@ -15,6 +15,7 @@
 #include <siri/db/db.h>
 #include <siri/db/series.h>
 #include <siri/db/points.h>
+#include <unistd.h>
 
 #define MAX_BUFFER_SZ 10485760
 
@@ -37,4 +38,7 @@ int siridb_buffer_write_point(
         uint64_t * ts,
         qp_via_t * val);
 
+int siridb_buffer_fsync(siridb_t * siridb);
+
+
 #endif  /* SIRIDB_BUFFER_H_ */
index b641407fa7ec6456bdd0f7ce5d7c9e85932c5ead..3306f39f2114d147338ad919e6f0117419995131 100644 (file)
@@ -96,6 +96,12 @@ int siridb_buffer_new_series(siridb_t * siridb, siridb_series_t * series)
             BUFFER_create_new(siridb, series);
 }
 
+int siridb_buffer_fsync(siridb_t * siridb)
+{
+    int buffer_fd = fileno(siridb->buffer_fp);
+    return (buffer_fd != -1) ? fsync(buffer_fd) : -1;
+}
+
 /*
  * Returns 0 if successful or -1 in case of an error.
  */
index 1cfe1712608513e79b16f11d668d5bfd5d343488..1d9921622c702f5fc0c0108d9170f9b601ee2f3e 100644 (file)
@@ -881,6 +881,7 @@ static void INSERT_local_task(uv_async_t * handle)
 
     siridb_insert_local_t * ilocal = (siridb_insert_local_t *) handle->data;
     qp_unpacker_t * unpacker = &ilocal->unpacker;
+    siridb_t * siridb;
 
     /*
      * we check for siri_err because siridb_series_add_point()
@@ -900,7 +901,7 @@ static void INSERT_local_task(uv_async_t * handle)
         return;
     }
 
-    siridb_t * siridb = ilocal->siridb;
+    siridb = ilocal->siridb;
 
     if (siridb->buffer_fp == NULL && siridb_buffer_open(siridb))
     {
@@ -951,6 +952,7 @@ static void INSERT_local_task(uv_async_t * handle)
             ilocal->status = INSERT_LOCAL_ERROR;
         }
     }
+
     uv_mutex_unlock(&siridb->series_mutex);
     uv_mutex_unlock(&siridb->shards_mutex);
 
index cf6fde4f33bde3b123f3f608277f2159433743a1..6d55788930b0ea95780e265c6283a471766116f9 100644 (file)
@@ -16,6 +16,7 @@
 #include <logger/logger.h>
 #include <siri/db/server.h>
 #include <siri/heartbeat.h>
+#include <siri/db/buffer.h>
 #include <uv.h>
 
 #if DEBUG
@@ -92,5 +93,11 @@ static void HEARTBEAT_cb(uv_timer_t * handle __attribute__((unused)))
 
         siridb_node = siridb_node->next;
     }
+
+    /* flush the buffer, maybe on each insert or another interval? */
+    if (siridb_buffer_fsync(siridb))
+    {
+        log_critical("fsync() has failed on the buffer file");
+    }
 }